package com.lock.zklock;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.zk.ZkClientHelper;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * Zookeepr分布式锁, 这里使用了Curator框架
 * 主要调用ZkClientHelper和ZkDistributedMutex两个核心类
 *
 * <p>
 * ## zookeeper特性：
 * 1. 在Zookeeper中，znode是一个跟Unix文件系统路径相似的节点，可以往这个节点存储或获取数据。
 * 2. 通过客户端，可对znode进行增删改查等操作，还可以注册watcher监控znode的变化。
 * 3. zookeeper的同父子节点不可重名 [ 不同的父节点，子节点可重名]
 * <p>
 * ## zookeeper实现分布式锁原理
 * 1. 加锁：去创建指定名称的节点，如果创建成功，则获得锁（加锁成功）。如果节点已存在，就表示锁被别人获取了
 * 2. 释放锁：删除指定名称的节点
 * 3. zookeeper的同父子节点不可重名
 * 参考：https://blog.csdn.net/koflance/article/details/78616206
 * <p>
 * ## zookeeper节点类型
 * 1. 持久节点（persistent：默认）
 * 2. 持久顺序节点（persistent_sequential）
 * 3. 临时节点（ephemeral）：客户端关闭了就会消失
 * 4. 临时顺序节点 （ephemeral_sequential）:  && 实现分布式锁的关键 &&
 * <p>
 * ## 为什么要临时顺序节点实现分布式锁？
 * 1. 临时节点可以避免[ 在获取锁后释放锁前，重启服务时没有释放zookeeper服务里的锁，从而导致死锁的问题 ]
 * 2. 顺序节点可以避免[ 删除节点时，唤醒所有线程，引发所有线程去抢锁，就像所有海鸟去抢你手中的食物一样，从而导致惊群效应的问题 ]
 * 3. 顺序节点可保证一次唤醒一个线程
 * <p>
 * ## 惊群效应导致什么问题？
 * 1. 巨大的服务性能损耗
 * 2. 网络冲击
 * 3. 可能造成空宕机
 * 参考：https://blog.csdn.net/second60/article/details/81252106
 */
public class ZkLockHelper {
    /**
     * 日志
     */
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    /**
     *
     */
    public static final String PREFIX_MID_PATH = "zklocking2";
    /**
     *
     */
    public static final String CFG_PATH = ZKPaths.makePath(ZkClientHelper.BASE_PATH, PREFIX_MID_PATH);
    /**
     * 存放所有带有ticketID唯一标记区分的锁
     */
    private static final Map<String, ZkDistributedMutex> LOCK_MAP = Maps.newConcurrentMap();
    /**
     * 使用ThreadLocal存储当前线程中的ZkDistributedMutex对象
     */
    private static ThreadLocal<ZkDistributedMutex> threadLocal = new ThreadLocal<ZkDistributedMutex>();


    /**
     * 单例模式私有化构造器
     */
    private ZkLockHelper() {

    }

    /**
     * 静态内部内类实现单例模式
     * 这种方式既满足了不使用就不创建对象和安全性问题
     */
    private static class ZkLockHelperHolder {
        private static ZkLockHelper instance = new ZkLockHelper();
    }

    /**
     * 获取单例对象
     */
    public static ZkLockHelper getInstance() {
        return ZkLockHelperHolder.instance;
    }

    /**
     * 获取互斥锁
     * 在没拿到锁之前，会重复挂起和唤醒当前线程，直到拿到锁
     *
     * @param lockName
     * @return
     * @throws Exception
     */
    public boolean getLock(String lockName) throws Exception {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        ZkDistributedMutex lock = threadLocal.get();
        if (lock == null) {
            lock = new ZkDistributedMutex(ZkClientHelper.getZkClient(), CFG_PATH, lockName);
            threadLocal.set(lock);
        }
        lock.acquire();
        return true;
    }

    /**
     * 获取互斥锁
     * 在没拿到锁之前，会重复挂起和唤醒当前线程，直到拿到锁
     *
     * @param lockName 锁名称
     * @param ticketID 唯一凭证ID，能保证key不重复
     *                 所以有该参数时，对象保存在ConcurrentHashMap中；没有时是保存在threadLocal中的
     * @return
     * @throws Exception
     */
    public boolean getLock(String lockName, String ticketID) throws Exception {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        Preconditions.checkNotNull(ticketID, "ticketID should not be null");

        String key = lockName + ZkClientHelper.UNDER_LINE + ticketID;
        ZkDistributedMutex lock = LOCK_MAP.get(key);
        if (lock == null) {
            lock = new ZkDistributedMutex(ZkClientHelper.getZkClient(), CFG_PATH, lockName);
            LOCK_MAP.put(key, lock);
        }
        lock.acquire();
        return true;
    }

    /**
     * 获取互斥锁
     * 第一次没拿到锁时，会在等待指定时间后再尝试一次，再拿不到锁直接返回false
     *
     * @param lockName 锁名称
     * @param time     没拿到锁时再次重试前的等待时间,时间为0就相当于tryLock()
     * @param unit     时间单位，如果不传，则当前线程会被不停的挂起和唤醒直到拿到锁
     * @return
     * @throws Exception
     */
    public boolean getLock(String lockName, long time, TimeUnit unit) throws Exception {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        // 获取当前线程中拿到的锁：这里也就实现了锁重入
        ZkDistributedMutex lock = threadLocal.get();

        ZkClientHelper.getZkClient();
        if (lock == null) {
            lock = new ZkDistributedMutex(ZkClientHelper.getZkClient(), CFG_PATH, lockName);
        }
        boolean rtn = lock.acquire(time, unit);
        if (rtn) {
            threadLocal.set(lock);
        }
        return rtn;
    }

    /**
     * 获取互斥锁
     * 第一次没拿到锁时，会在等待指定时间后再尝试一次，再拿不到锁直接返回false
     *
     * @param lockName 锁名称
     * @param ticketID 唯一凭证ID，能保证key不重复
     *                 有该参数时，对象保存在ConcurrentHashMap中；没有时是保存在threadLocal中的
     * @param time     没拿到锁时再次重试前的等待时间
     * @param unit     时间单位，如果不传，则当前线程会被不停的挂起和唤醒直到拿到锁
     * @return
     * @throws Exception
     */
    public boolean getLock(String lockName, String ticketID, long time, TimeUnit unit) throws Exception {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        Preconditions.checkNotNull(ticketID, "ticketID should not be null");

        String key = lockName + ZkClientHelper.UNDER_LINE + ticketID;
        ZkDistributedMutex lock = LOCK_MAP.get(key);
        ZkClientHelper.getZkClient();
        if (lock == null) {
            lock = new ZkDistributedMutex(ZkClientHelper.getZkClient(), CFG_PATH, lockName);
        }
        boolean rtn = lock.acquire(time, unit);
        if (rtn) {
            LOCK_MAP.put(key, lock);
        }
        return rtn;
    }

    /**
     * 释放锁
     *
     * @param lockName 锁名
     */
    public void releaseLock(String lockName) {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        ZkDistributedMutex lock = threadLocal.get();
        if (lock != null) {
            threadLocal.remove();
            try {
                // 判断当前锁资源是否还在
                if (lock.isAcquiredInThisProcess()) {
                    lock.release();
                }
            } catch (Throwable e) {
                //log.error("releaselock key:{},ERROR:", key, e);
            }
            lock = null;
        }
    }

    /**
     * 释放锁
     *
     * @param lockName 锁名称
     * @param ticketID 唯一凭证ID
     */
    public void releaseLock(String lockName, String ticketID) {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        Preconditions.checkNotNull(ticketID, "ticketID should not be null");
        String key = lockName + ZkClientHelper.UNDER_LINE + ticketID;
        ZkDistributedMutex lock = LOCK_MAP.remove(key);
        if (lock != null) {
            try {
                if (lock.isAcquiredInThisProcess()) {
                    lock.release();
                }
            } catch (Throwable e) {
                //log.error("releaselock key:{},ERROR:", key, e);
            }
            lock = null;
        }
    }

    /**
     * 判断当前线程是否持有该锁
     *
     * @param lockName 锁名称
     * @return
     * @throws Exception
     */
    public boolean isHoldLock(String lockName) throws Exception {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        ZkDistributedMutex lock = threadLocal.get();
        return lock == null ? false : lock.isAcquiredInThisProcess();

    }

    /**
     * 判断是否持有该锁
     *
     * @param lockName 锁名称
     * @param ticketID 唯一凭证ID，让key唯一
     * @return
     * @throws Exception
     */
    public boolean isHoldLock(String lockName, String ticketID) throws Exception {
        Preconditions.checkNotNull(lockName, "lockName should not be null");
        Preconditions.checkNotNull(ticketID, "ticketID should not be null");
        String key = lockName + ZkClientHelper.UNDER_LINE + ticketID;
        ZkDistributedMutex lock = LOCK_MAP.get(key);
        return lock == null ? false : lock.isAcquiredInThisProcess();

    }

    /**
     * 释放所有tickID唯一标记的锁
     */
    public synchronized void releaseLockAll() {
        log.warn("release All Lock");
        List<String> keyList = new ArrayList<String>();
        for (String key : LOCK_MAP.keySet()) {
            keyList.add(key);
        }
        for (String key : keyList) {
            try {
                ZkDistributedMutex lock = LOCK_MAP.remove(key);
                if (lock != null) {
                    log.warn("Close LeaderLatch:{}", key);
                    lock.release();
                    lock = null;
                }
            } catch (Throwable e) {
                //log.error("releaseLockAll key:{},ERROR:", key, e);
            }
        }
        LOCK_MAP.clear();
    }

}